/*-------------------------------------------------------------------------
 * schema_based_sharding.c
 *
 *	  Routines for schema-based sharding.
 *
 *-------------------------------------------------------------------------
 */

#include "postgres.h"

#include "miscadmin.h"

#include "access/genam.h"
#include "catalog/catalog.h"
#include "catalog/pg_namespace_d.h"
#include "commands/extension.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"

#include "distributed/argutils.h"
#include "distributed/backend_data.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/shard_transfer.h"
#include "distributed/tenant_schema_metadata.h"
#include "distributed/worker_shard_visibility.h"


/* return value of CreateCitusMoveSchemaParams() */
typedef struct
{
	uint64 anchorShardId;
	uint32 sourceNodeId;
	char *sourceNodeName;
	uint32 sourceNodePort;
} CitusMoveSchemaParams;


static void UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName);
static List * SchemaGetNonShardTableIdList(Oid schemaId);
static void EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList);
static void EnsureTenantSchemaNameAllowed(Oid schemaId);
static void EnsureTableKindSupportedForTenantSchema(Oid relationId);
static void EnsureFKeysForTenantTable(Oid relationId);
static void EnsureSchemaExist(Oid schemaId);
static CitusMoveSchemaParams * CreateCitusMoveSchemaParams(Oid schemaId);
static uint64 TenantSchemaPickAnchorShardId(Oid schemaId);


/* controlled via citus.enable_schema_based_sharding GUC */
bool EnableSchemaBasedSharding = false;


const char *TenantOperationNames[TOTAL_TENANT_OPERATION] = {
	"undistribute_table",
	"alter_distributed_table",
	"colocate_with",
	"update_distributed_table_colocation",
	"set schema",
};


PG_FUNCTION_INFO_V1(citus_internal_unregister_tenant_schema_globally);
PG_FUNCTION_INFO_V1(citus_schema_distribute);
PG_FUNCTION_INFO_V1(citus_schema_undistribute);
PG_FUNCTION_INFO_V1(citus_schema_move);
PG_FUNCTION_INFO_V1(citus_schema_move_with_nodeid);

/*
 * ShouldUseSchemaBasedSharding returns true if schema given name should be
 * used as a tenant schema.
 */
bool
ShouldUseSchemaBasedSharding(char *schemaName)
{
	if (!EnableSchemaBasedSharding)
	{
		return false;
	}

	if (IsBinaryUpgrade)
	{
		return false;
	}

	/*
	 * Citus utility hook skips processing CREATE SCHEMA commands while an
	 * extension is being created. For this reason, we don't expect to get
	 * here while an extension is being created.
	 */
	Assert(!creating_extension);

	/*
	 * CREATE SCHEMA commands issued by internal backends are not meant to
	 * create tenant schemas but to sync metadata.
	 *
	 * On workers, Citus utility hook skips processing CREATE SCHEMA commands
	 * because we temporarily disable DDL propagation on workers when sending
	 * CREATE SCHEMA commands. For this reason, right now this check is a bit
	 * redundant but we prefer to keep it here to be on the safe side.
	 */
	if (IsCitusInternalBackend() || IsRebalancerInternalBackend())
	{
		return false;
	}

	/*
	 * Not do an oid comparison based on PG_PUBLIC_NAMESPACE because
	 * we want to treat "public" schema in the same way even if it's
	 * recreated.
	 */
	if (strcmp(schemaName, "public") == 0)
	{
		return false;
	}

	return true;
}


/*
 * ShouldCreateTenantSchemaTable returns true if we should create a tenant
 * schema table for given relationId.
 */
bool
ShouldCreateTenantSchemaTable(Oid relationId)
{
	if (IsBinaryUpgrade)
	{
		return false;
	}

	/*
	 * CREATE TABLE commands issued by internal backends are not meant to
	 * create tenant tables but to sync metadata.
	 */
	if (IsCitusInternalBackend() || IsRebalancerInternalBackend())
	{
		return false;
	}

	Oid schemaId = get_rel_namespace(relationId);
	return IsTenantSchema(schemaId);
}


/*
 * EnsureTableKindSupportedForTenantSchema ensures that given table's kind is
 * supported by a tenant schema.
 */
static void
EnsureTableKindSupportedForTenantSchema(Oid relationId)
{
	if (IsForeignTable(relationId))
	{
		ereport(ERROR, (errmsg("cannot create a foreign table in a distributed "
							   "schema")));
	}

	if (PartitionTable(relationId))
	{
		ErrorIfIllegalPartitioningInTenantSchema(PartitionParentOid(relationId),
												 relationId);
	}

	if (PartitionedTable(relationId))
	{
		List *partitionList = PartitionList(relationId);

		Oid partitionRelationId = InvalidOid;
		foreach_oid(partitionRelationId, partitionList)
		{
			ErrorIfIllegalPartitioningInTenantSchema(relationId, partitionRelationId);
		}
	}

	if (IsChildTable(relationId) || IsParentTable(relationId))
	{
		ereport(ERROR, (errmsg("tables in a distributed schema cannot inherit or "
							   "be inherited")));
	}
}


/*
 * EnsureFKeysForTenantTable ensures that all referencing and referenced foreign
 * keys are allowed for given table.
 */
static void
EnsureFKeysForTenantTable(Oid relationId)
{
	Oid tenantSchemaId = get_rel_namespace(relationId);
	int fKeyReferencingFlags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
	List *referencingForeignKeys = GetForeignKeyOids(relationId, fKeyReferencingFlags);
	Oid foreignKeyId = InvalidOid;
	foreach_oid(foreignKeyId, referencingForeignKeys)
	{
		Oid referencingTableId = GetReferencingTableId(foreignKeyId);
		Oid referencedTableId = GetReferencedTableId(foreignKeyId);
		Oid referencedTableSchemaId = get_rel_namespace(referencedTableId);

		/* We allow foreign keys to a table in the same schema */
		if (tenantSchemaId == referencedTableSchemaId)
		{
			continue;
		}

		/*
		 * Allow foreign keys to the other schema only if the referenced table is
		 * a reference table.
		 */
		if (!IsCitusTable(referencedTableId) ||
			!IsCitusTableType(referencedTableId, REFERENCE_TABLE))
		{
			ereport(ERROR, (errmsg("foreign keys from distributed schemas can only "
								   "point to the same distributed schema or reference "
								   "tables in regular schemas"),
							errdetail("\"%s\" references \"%s\" via foreign key "
									  "constraint \"%s\"",
									  generate_qualified_relation_name(
										  referencingTableId),
									  generate_qualified_relation_name(referencedTableId),
									  get_constraint_name(foreignKeyId))));
		}
	}

	int fKeyReferencedFlags = INCLUDE_REFERENCED_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
	List *referencedForeignKeys = GetForeignKeyOids(relationId, fKeyReferencedFlags);
	foreach_oid(foreignKeyId, referencedForeignKeys)
	{
		Oid referencingTableId = GetReferencingTableId(foreignKeyId);
		Oid referencedTableId = GetReferencedTableId(foreignKeyId);
		Oid referencingTableSchemaId = get_rel_namespace(referencingTableId);

		/* We allow foreign keys from a table in the same schema */
		if (tenantSchemaId == referencingTableSchemaId)
		{
			continue;
		}

		/* Not allow any foreign keys from the other schema */
		ereport(ERROR, (errmsg("cannot create foreign keys to tables in a distributed "
							   "schema from another schema"),
						errdetail("\"%s\" references \"%s\" via foreign key "
								  "constraint \"%s\"",
								  generate_qualified_relation_name(referencingTableId),
								  generate_qualified_relation_name(referencedTableId),
								  get_constraint_name(foreignKeyId))));
	}
}


/*
 * CreateTenantSchemaTable creates a tenant table with given relationId.
 *
 * This means creating a single shard distributed table without a shard
 * key and colocating it with the other tables in its schema.
 */
void
CreateTenantSchemaTable(Oid relationId)
{
	if (!IsCoordinator())
	{
		/*
		 * We don't support creating tenant tables from workers. We could
		 * let ShouldCreateTenantSchemaTable() to return false to allow users
		 * to create a local table as usual but that would be confusing because
		 * it might sound like we allow creating tenant tables from workers.
		 * For this reason, we prefer to throw an error instead.
		 *
		 * Indeed, CreateSingleShardTable() would already do so but we
		 * prefer to throw an error with a more meaningful message, rather
		 * than saying "operation is not allowed on this node".
		 */
		ereport(ERROR, (errmsg("cannot create tables in a distributed schema from "
							   "a worker node"),
						errhint("Connect to the coordinator node and try again.")));
	}

	EnsureTableKindSupportedForTenantSchema(relationId);

	/*
	 * We don't expect this to happen because ShouldCreateTenantSchemaTable()
	 * should've already verified that; but better to check.
	 */
	Oid schemaId = get_rel_namespace(relationId);
	uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
	if (colocationId == INVALID_COLOCATION_ID)
	{
		ereport(ERROR, (errmsg("schema \"%s\" is not distributed",
							   get_namespace_name(schemaId))));
	}

	ColocationParam colocationParam = {
		.colocationParamType = COLOCATE_WITH_COLOCATION_ID,
		.colocationId = colocationId,
	};
	CreateSingleShardTable(relationId, colocationParam);
}


/*
 * ErrorIfIllegalPartitioningInTenantSchema throws an error if the
 * partitioning relationship between the parent and the child is illegal
 * because they are in different schemas while one of them is a tenant table.
 *
 * This function assumes that either the parent or the child are in a tenant
 * schema.
 */
void
ErrorIfIllegalPartitioningInTenantSchema(Oid parentRelationId, Oid partitionRelationId)
{
	if (get_rel_namespace(partitionRelationId) != get_rel_namespace(parentRelationId))
	{
		ereport(ERROR, (errmsg("partitioning within a distributed schema is not "
							   "supported when the parent and the child "
							   "are in different schemas")));
	}
}


/*
 * CreateTenantSchemaColocationId returns new colocation id for a tenant schema.
 */
uint32
CreateTenantSchemaColocationId(void)
{
	int shardCount = 1;
	int replicationFactor = 1;
	Oid distributionColumnType = InvalidOid;
	Oid distributionColumnCollation = InvalidOid;
	uint32 schemaColocationId = CreateColocationGroup(
		shardCount, replicationFactor, distributionColumnType,
		distributionColumnCollation);
	return schemaColocationId;
}


/*
 * SchemaGetNonShardTableIdList returns all nonshard relation ids
 * inside given schema.
 */
static List *
SchemaGetNonShardTableIdList(Oid schemaId)
{
	List *relationIdList = NIL;

	/* scan all relations in pg_class and return all tables inside given schema */
	Relation relationRelation = relation_open(RelationRelationId, AccessShareLock);

	ScanKeyData scanKey[1] = { 0 };
	ScanKeyInit(&scanKey[0], Anum_pg_class_relnamespace, BTEqualStrategyNumber,
				F_OIDEQ, ObjectIdGetDatum(schemaId));
	SysScanDesc scanDescriptor = systable_beginscan(relationRelation, ClassNameNspIndexId,
													true, NULL, 1, scanKey);

	HeapTuple heapTuple = NULL;
	while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
	{
		Form_pg_class relationForm = (Form_pg_class) GETSTRUCT(heapTuple);
		char *relationName = NameStr(relationForm->relname);
		Oid relationId = get_relname_relid(relationName, schemaId);

		if (!OidIsValid(relationId))
		{
			ereport(ERROR, errmsg("table %s is dropped by a concurrent operation",
								  relationName));
		}

		/* skip shards */
		if (RelationIsAKnownShard(relationId))
		{
			continue;
		}

		if (RegularTable(relationId) || PartitionTable(relationId) ||
			IsForeignTable(relationId))
		{
			relationIdList = lappend_oid(relationIdList, relationId);
		}
	}

	systable_endscan(scanDescriptor);
	relation_close(relationRelation, AccessShareLock);

	return relationIdList;
}


/*
 * EnsureSchemaCanBeDistributed ensures the schema can be distributed.
 * Caller should take required the lock on relations and the schema.
 *
 * It checks:
 *  - Schema name is in the allowed-list,
 *  - Schema does not depend on an extension (created by extension),
 *  - No extension depends on the schema (CREATE EXTENSION <ext> SCHEMA <schema>),
 *	- Some checks for the table for being a valid tenant table.
 */
static void
EnsureSchemaCanBeDistributed(Oid schemaId, List *schemaTableIdList)
{
	/* Ensure schema name is allowed */
	EnsureTenantSchemaNameAllowed(schemaId);

	/* Any schema owned by extension is not allowed */
	char *schemaName = get_namespace_name(schemaId);
	ObjectAddress *schemaAddress = palloc0(sizeof(ObjectAddress));
	ObjectAddressSet(*schemaAddress, NamespaceRelationId, schemaId);
	if (IsAnyObjectAddressOwnedByExtension(list_make1(schemaAddress), NULL))
	{
		ereport(ERROR, (errmsg("schema %s, which is owned by an extension, cannot "
							   "be distributed", schemaName)));
	}

	/* Extension schemas are not allowed */
	ObjectAddress *extensionAddress = FirstExtensionWithSchema(schemaId);
	if (extensionAddress)
	{
		char *extensionName = get_extension_name(extensionAddress->objectId);
		ereport(ERROR, (errmsg("schema %s cannot be distributed since it is the schema "
							   "of extension %s", schemaName, extensionName)));
	}

	Oid relationId = InvalidOid;
	foreach_oid(relationId, schemaTableIdList)
	{
		EnsureTenantTable(relationId, "citus_schema_distribute");
	}
}


/*
 * EnsureTenantTable ensures the table can be a valid tenant table.
 *  - Current user should be the owner of table,
 *  - Table kind is supported,
 *  - Referencing and referenced foreign keys for the table are supported,
 *	- Table is not owned by an extension,
 *  - Table should be Citus local or Postgres local table.
 */
void
EnsureTenantTable(Oid relationId, char *operationName)
{
	/* Ensure table owner */
	EnsureTableOwner(relationId);

	/* Check relation kind */
	EnsureTableKindSupportedForTenantSchema(relationId);

	/* Check foreign keys */
	EnsureFKeysForTenantTable(relationId);

	/* Check table not owned by an extension */
	ObjectAddress *tableAddress = palloc0(sizeof(ObjectAddress));
	ObjectAddressSet(*tableAddress, RelationRelationId, relationId);
	if (IsAnyObjectAddressOwnedByExtension(list_make1(tableAddress), NULL))
	{
		Oid schemaId = get_rel_namespace(relationId);
		char *tableName = get_namespace_name(schemaId);
		ereport(ERROR, (errmsg("schema cannot be distributed since it has "
							   "table %s which is owned by an extension",
							   tableName)));
	}

	/* Postgres local tables are allowed */
	if (!IsCitusTable(relationId))
	{
		return;
	}

	/* Only Citus local tables, amongst Citus table types, are allowed */
	if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
	{
		ereport(ERROR, (errmsg("distributed schema cannot have distributed tables"),
						errhint("Undistribute distributed tables before "
								"'%s'.", operationName)));
	}
}


/*
 * EnsureTenantSchemaNameAllowed ensures if given schema is applicable for registering
 * as a tenant schema.
 */
static void
EnsureTenantSchemaNameAllowed(Oid schemaId)
{
	char *schemaName = get_namespace_name(schemaId);

	/* public schema is not allowed */
	if (strcmp(schemaName, "public") == 0)
	{
		ereport(ERROR, (errmsg("public schema cannot be distributed")));
	}

	/* information_schema schema is not allowed */
	if (strcmp(schemaName, "information_schema") == 0)
	{
		ereport(ERROR, (errmsg("information_schema schema cannot be distributed")));
	}

	/* pg_temp_xx and pg_toast_temp_xx schemas are not allowed */
	if (isAnyTempNamespace(schemaId))
	{
		ereport(ERROR, (errmsg("temporary schema cannot be distributed")));
	}

	/* pg_catalog schema is not allowed */
	if (IsCatalogNamespace(schemaId))
	{
		ereport(ERROR, (errmsg("pg_catalog schema cannot be distributed")));
	}

	/* pg_toast schema is not allowed */
	if (IsToastNamespace(schemaId))
	{
		ereport(ERROR, (errmsg("pg_toast schema cannot be distributed")));
	}
}


/*
 * EnsureSchemaExist ensures that schema exists. Caller is responsible to take
 * the required lock on the schema.
 */
static void
EnsureSchemaExist(Oid schemaId)
{
	if (!SearchSysCacheExists1(NAMESPACEOID, ObjectIdGetDatum(schemaId)))
	{
		ereport(ERROR, (errcode(ERRCODE_UNDEFINED_SCHEMA),
						errmsg("schema with OID %u does not exist", schemaId)));
	}
}


/*
 * UnregisterTenantSchemaGlobally removes given schema from the tenant schema
 * metadata table, deletes the colocation group of the schema and sends the
 * command to do the same on the workers.
 */
static void
UnregisterTenantSchemaGlobally(Oid schemaId, char *schemaName)
{
	uint32 tenantSchemaColocationId = SchemaIdGetTenantColocationId(schemaId);

	DeleteTenantSchemaLocally(schemaId);
	if (EnableMetadataSync)
	{
		SendCommandToWorkersWithMetadata(TenantSchemaDeleteCommand(schemaName));
	}

	DeleteColocationGroup(tenantSchemaColocationId);
}


/*
 * citus_internal_unregister_tenant_schema_globally, called by Citus drop hook,
 * unregisters the schema when a tenant schema is dropped.
 *
 * NOTE: We need to pass schema_name as an argument. We cannot use schema id
 * to obtain schema name since the schema would have already been dropped when this
 * udf is called by the drop hook.
 */
Datum
citus_internal_unregister_tenant_schema_globally(PG_FUNCTION_ARGS)
{
	PG_ENSURE_ARGNOTNULL(0, "schema_id");
	Oid schemaId = PG_GETARG_OID(0);

	PG_ENSURE_ARGNOTNULL(1, "schema_name");
	text *schemaName = PG_GETARG_TEXT_PP(1);
	char *schemaNameStr = text_to_cstring(schemaName);

	/*
	 * Skip on workers because we expect this to be called from the coordinator
	 * only via drop hook.
	 */
	if (!IsCoordinator())
	{
		PG_RETURN_VOID();
	}

	/* make sure that the schema is dropped already */
	HeapTuple namespaceTuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(schemaId));
	if (HeapTupleIsValid(namespaceTuple))
	{
		ReleaseSysCache(namespaceTuple);
		ereport(ERROR, (errmsg("schema is expected to be already dropped "
							   "because this function is only expected to "
							   "be called from Citus drop hook")));
	}
	UnregisterTenantSchemaGlobally(schemaId, schemaNameStr);
	PG_RETURN_VOID();
}


/*
 * citus_schema_distribute gets a regular schema name, then converts it to a tenant
 * schema.
 */
Datum
citus_schema_distribute(PG_FUNCTION_ARGS)
{
	CheckCitusVersion(ERROR);
	EnsureCoordinator();

	Oid schemaId = PG_GETARG_OID(0);
	EnsureSchemaExist(schemaId);
	EnsureSchemaOwner(schemaId);

	/* Prevent concurrent table creation under the schema */
	LockDatabaseObject(NamespaceRelationId, schemaId, 0, AccessExclusiveLock);

	/*
	 * We should ensure the existence of the schema after taking the lock since
	 * the schema could have been dropped before we acquired the lock.
	 */
	EnsureSchemaExist(schemaId);
	EnsureSchemaOwner(schemaId);

	/* Return if the schema is already a tenant schema */
	char *schemaName = get_namespace_name(schemaId);
	if (IsTenantSchema(schemaId))
	{
		ereport(NOTICE, (errmsg("schema %s is already distributed", schemaName)));
		PG_RETURN_VOID();
	}

	/* Take lock on the relations and filter out partition tables */
	List *tableIdListInSchema = SchemaGetNonShardTableIdList(schemaId);
	List *tableIdListToConvert = NIL;
	Oid relationId = InvalidOid;
	foreach_oid(relationId, tableIdListInSchema)
	{
		/* prevent concurrent drop of the relation */
		LockRelationOid(relationId, AccessShareLock);
		EnsureRelationExists(relationId);

		/*
		 * Skip partitions as they would be distributed by the parent table.
		 *
		 * We should filter out partitions here before distributing the schema.
		 * Otherwise, converted partitioned table would change oid of partitions and its
		 * partition tables would fail with oid not exist.
		 */
		if (PartitionTable(relationId))
		{
			continue;
		}

		tableIdListToConvert = lappend_oid(tableIdListToConvert, relationId);
	}

	/* Makes sure the schema can be distributed. */
	EnsureSchemaCanBeDistributed(schemaId, tableIdListInSchema);

	ereport(NOTICE, (errmsg("distributing the schema %s", schemaName)));

	/* Create colocation id and then single shard tables with the colocation id */
	uint32 colocationId = CreateTenantSchemaColocationId();
	ColocationParam colocationParam = {
		.colocationParamType = COLOCATE_WITH_COLOCATION_ID,
		.colocationId = colocationId,
	};

	/*
	 * Collect foreign keys for recreation and then drop fkeys and create single shard
	 * tables.
	 */
	List *originalForeignKeyRecreationCommands = NIL;
	foreach_oid(relationId, tableIdListToConvert)
	{
		List *fkeyCommandsForRelation =
			GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
																 INCLUDE_ALL_TABLE_TYPES);
		originalForeignKeyRecreationCommands = list_concat(
			originalForeignKeyRecreationCommands, fkeyCommandsForRelation);

		DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_ALL_TABLE_TYPES);
		CreateSingleShardTable(relationId, colocationParam);
	}

	/* We can skip foreign key validations as we are sure about them at start */
	bool skip_validation = true;
	ExecuteForeignKeyCreateCommandList(originalForeignKeyRecreationCommands,
									   skip_validation);

	/* Register the schema locally and sync it to workers */
	InsertTenantSchemaLocally(schemaId, colocationId);
	char *registerSchemaCommand = TenantSchemaInsertCommand(schemaId, colocationId);
	if (EnableMetadataSync)
	{
		SendCommandToWorkersWithMetadata(registerSchemaCommand);
	}

	PG_RETURN_VOID();
}


/*
 * citus_schema_undistribute gets a tenant schema name, then converts it to a regular
 * schema by undistributing all tables under it.
 */
Datum
citus_schema_undistribute(PG_FUNCTION_ARGS)
{
	CheckCitusVersion(ERROR);
	EnsureCoordinator();

	Oid schemaId = PG_GETARG_OID(0);
	EnsureSchemaExist(schemaId);
	EnsureSchemaOwner(schemaId);

	/* Prevent concurrent table creation under the schema */
	LockDatabaseObject(NamespaceRelationId, schemaId, 0, AccessExclusiveLock);

	/*
	 * We should ensure the existence of the schema after taking the lock since
	 * the schema could have been dropped before we acquired the lock.
	 */
	EnsureSchemaExist(schemaId);
	EnsureSchemaOwner(schemaId);

	/* The schema should be a tenant schema */
	char *schemaName = get_namespace_name(schemaId);
	if (!IsTenantSchema(schemaId))
	{
		ereport(ERROR, (errmsg("schema %s is not distributed", schemaName)));
	}

	ereport(NOTICE, (errmsg("undistributing schema %s", schemaName)));

	/* Take lock on the relations and filter out partition tables */
	List *tableIdListInSchema = SchemaGetNonShardTableIdList(schemaId);
	List *tableIdListToConvert = NIL;
	Oid relationId = InvalidOid;
	foreach_oid(relationId, tableIdListInSchema)
	{
		/* prevent concurrent drop of the relation */
		LockRelationOid(relationId, AccessShareLock);
		EnsureRelationExists(relationId);

		/*
		 * Skip partitions as they would be undistributed by the parent table.
		 *
		 * We should filter out partitions here before undistributing the schema.
		 * Otherwise, converted partitioned table would change oid of partitions and its
		 * partition tables would fail with oid not exist.
		 */
		if (PartitionTable(relationId))
		{
			continue;
		}

		tableIdListToConvert = lappend_oid(tableIdListToConvert, relationId);

		/* Only single shard tables are expected during the undistribution of the schema */
		Assert(IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED));
	}

	/*
	 * First, we need to delete schema metadata and sync it to workers. Otherwise,
	 * we would get error from `ErrorIfTenantTable` while undistributing the tables.
	 */
	UnregisterTenantSchemaGlobally(schemaId, schemaName);
	UndistributeTables(tableIdListToConvert);

	PG_RETURN_VOID();
}


/*
 * citus_schema_move moves the shards that belong to given distributed tenant
 * schema from one node to the other node by using citus_move_shard_placement().
 */
Datum
citus_schema_move(PG_FUNCTION_ARGS)
{
	CheckCitusVersion(ERROR);
	EnsureCoordinator();

	Oid schemaId = PG_GETARG_OID(0);
	CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);

	DirectFunctionCall6(citus_move_shard_placement,
						UInt64GetDatum(params->anchorShardId),
						CStringGetTextDatum(params->sourceNodeName),
						UInt32GetDatum(params->sourceNodePort),
						PG_GETARG_DATUM(1),
						PG_GETARG_DATUM(2),
						PG_GETARG_DATUM(3));
	PG_RETURN_VOID();
}


/*
 * citus_schema_move_with_nodeid does the same as citus_schema_move(), but
 * accepts node id as parameter instead of hostname and port, hence uses
 * citus_move_shard_placement_with_nodeid().
 */
Datum
citus_schema_move_with_nodeid(PG_FUNCTION_ARGS)
{
	CheckCitusVersion(ERROR);
	EnsureCoordinator();

	Oid schemaId = PG_GETARG_OID(0);
	CitusMoveSchemaParams *params = CreateCitusMoveSchemaParams(schemaId);

	DirectFunctionCall4(citus_move_shard_placement_with_nodeid,
						UInt64GetDatum(params->anchorShardId),
						UInt32GetDatum(params->sourceNodeId),
						PG_GETARG_DATUM(1),
						PG_GETARG_DATUM(2));
	PG_RETURN_VOID();
}


/*
 * CreateCitusMoveSchemaParams is a helper function for
 * citus_schema_move() and citus_schema_move_with_nodeid()
 * that validates input schema and returns the parameters to be used in underlying
 * shard transfer functions.
 */
static CitusMoveSchemaParams *
CreateCitusMoveSchemaParams(Oid schemaId)
{
	EnsureSchemaExist(schemaId);
	EnsureSchemaOwner(schemaId);

	if (!IsTenantSchema(schemaId))
	{
		ereport(ERROR, (errmsg("schema %s is not a distributed schema",
							   get_namespace_name(schemaId))));
	}

	uint64 anchorShardId = TenantSchemaPickAnchorShardId(schemaId);
	if (anchorShardId == INVALID_SHARD_ID)
	{
		ereport(ERROR, (errmsg("cannot move distributed schema %s because it is empty",
							   get_namespace_name(schemaId))));
	}

	uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
	uint32 sourceNodeId = SingleShardTableColocationNodeId(colocationId);

	bool missingOk = false;
	WorkerNode *sourceNode = FindNodeWithNodeId(sourceNodeId, missingOk);

	CitusMoveSchemaParams *params = palloc0(sizeof(CitusMoveSchemaParams));
	params->anchorShardId = anchorShardId;
	params->sourceNodeId = sourceNodeId;
	params->sourceNodeName = sourceNode->workerName;
	params->sourceNodePort = sourceNode->workerPort;
	return params;
}


/*
 * TenantSchemaPickAnchorShardId returns the id of one of the shards
 * created in given tenant schema.
 *
 * Returns INVALID_SHARD_ID if the schema was initially empty or if it's not
 * a tenant schema.
 *
 * Throws an error if all the tables in the schema are concurrently dropped.
 */
static uint64
TenantSchemaPickAnchorShardId(Oid schemaId)
{
	uint32 colocationId = SchemaIdGetTenantColocationId(schemaId);
	List *tablesInSchema = ColocationGroupTableList(colocationId, 0);
	if (list_length(tablesInSchema) == 0)
	{
		return INVALID_SHARD_ID;
	}

	Oid relationId = InvalidOid;
	foreach_oid(relationId, tablesInSchema)
	{
		/*
		 * Make sure the relation isn't dropped for the remainder of
		 * the transaction.
		 */
		LockRelationOid(relationId, AccessShareLock);

		/*
		 * The relation might have been dropped just before we locked it.
		 * Let's look it up.
		 */
		Relation relation = RelationIdGetRelation(relationId);
		if (RelationIsValid(relation))
		{
			/* relation still exists, we can use it */
			RelationClose(relation);
			return GetFirstShardId(relationId);
		}
	}

	ereport(ERROR, (errmsg("tables in schema %s are concurrently dropped",
						   get_namespace_name(schemaId))));
}


/*
 * ErrorIfTenantTable errors out with the given operation name,
 * if the given relation is a tenant table.
 */
void
ErrorIfTenantTable(Oid relationId, const char *operationName)
{
	if (IsTenantSchema(get_rel_namespace(relationId)))
	{
		ereport(ERROR, (errmsg("%s is not allowed for %s because it belongs to "
							   "a distributed schema",
							   generate_qualified_relation_name(relationId),
							   operationName)));
	}
}
